package com.spring.kafka.controller;

import com.alibaba.fastjson.JSONObject;
import com.spring.kafka.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author liuzhiqiang
 */
@RestController
@Slf4j
public class TestController {
    // topic的名称
    private final static String TOPIC_NAME = "userTopic";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaRegistry;


    @GetMapping("/push")
    public String push() {
        User user = new User();
        user.setName("刘志强");
        user.setAge(15);
        user.setBirthday(100000);
        kafkaTemplate.send(TOPIC_NAME, JSONObject.toJSONString(user));

        User user1 = new User();
        user1.setName("王妍");
        user1.setAge(8);
        user1.setBirthday(100000);
        kafkaTemplate.send(TOPIC_NAME, JSONObject.toJSONString(user1));
        return "ok";
    }

    @GetMapping("/pauseAndResume")
    public String pauseAndResume(Integer type) {
        if (type == 1) {
            // 暂停
            kafkaRegistry.getListenerContainer("testListenerId").pause();
        } else {
            // 恢复
            kafkaRegistry.getListenerContainer("testListenerId").resume();
        }
        return "ok";
    }

    @KafkaListener(id = "testListenerId", topics = {"test"}, groupId = "zhTestGroup")
    public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        log.info("收到消息：【{}】", value);
        //手动提交offset
        ack.acknowledge();
    }

    @KafkaListener(id = "userTopicId", topics = {"userTopic"}, groupId = "zhTestGroup")
    public void userTopic(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        log.info("userTopic收到消息：【{}】", value);
        //手动提交offset
        ack.acknowledge();
    }

    @KafkaListener(id = "aboveUserTopicId", topics = {"aboveUserTopic"}, groupId = "zhTestGroup")
    public void aboveUserTopic(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        log.info("10AboveUserTopic收到消息：【{}】", value);
        //手动提交offset
        ack.acknowledge();
    }
}
